// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
	"context"
	"fmt"
	"math"
	"strings"

	"github.com/matrixorigin/matrixone/pkg/pb/plan"
	pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
	"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
	"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
	"github.com/matrixorigin/matrixone/pkg/vm/process"
)

const (
	JoinSideNone       int8 = 0
	JoinSideLeft            = 1 << 1
	JoinSideRight           = 1 << 2
	JoinSideBoth            = JoinSideLeft | JoinSideRight
	JoinSideMark            = 1 << 3
	JoinSideCorrelated      = 1 << 4
)

type ExpandAliasMode int8

const (
	NoAlias ExpandAliasMode = iota
	AliasBeforeColumn
	AliasAfterColumn
)

type TableDefType = plan.TableDef_DefType
type TableDef = plan.TableDef
type ColDef = plan.ColDef
type ObjectRef = plan.ObjectRef
type ColRef = plan.ColRef
type Stats = plan.Stats
type Const = plan.Literal
type MaxValue = plan.MaxValue
type Expr = plan.Expr
type Node = plan.Node
type RowsetData = plan.RowsetData
type Query = plan.Query
type Plan = plan.Plan
type Type = plan.Type
type Plan_Query = plan.Plan_Query
type Property = plan.Property
type TableDef_DefType_Properties = plan.TableDef_DefType_Properties
type PropertiesDef = plan.PropertiesDef
type ViewDef = plan.ViewDef
type PartitionByDef = plan.PartitionByDef
type ClusterByDef = plan.ClusterByDef
type OrderBySpec = plan.OrderBySpec
type FkColName = plan.FkColName
type ForeignKeyDef = plan.ForeignKeyDef
type ClusterTable = plan.ClusterTable
type PrimaryKeyDef = plan.PrimaryKeyDef
type IndexDef = plan.IndexDef
type SubscriptionMeta = plan.SubscriptionMeta
type Snapshot = plan.Snapshot
type SnapshotTenant = plan.SnapshotTenant

type CompilerContext interface {
	// Default database/schema in context
	DefaultDatabase() string
	// check if database exist
	DatabaseExists(name string, snapshot Snapshot) bool
	// get table definition by database/schema
	Resolve(schemaName string, tableName string, snapshot Snapshot) (*ObjectRef, *TableDef)
	// get table definition by table id
	ResolveById(tableId uint64, snapshot Snapshot) (*ObjectRef, *TableDef)
	// get the value of variable
	ResolveVariable(varName string, isSystemVar, isGlobalVar bool) (interface{}, error)
	// get the list of the account id
	ResolveAccountIds(accountNames []string) ([]uint32, error)
	// get the relevant information of udf
	ResolveUdf(name string, args []*Expr) (*function.Udf, error)
	// get the definition of primary key
	GetPrimaryKeyDef(dbName string, tableName string, snapshot Snapshot) []*ColDef
	// get needed info for stats by table
	Stats(obj *ObjectRef, snapshot Snapshot) (*pb.StatsInfo, error)
	// get origin sql string of the root
	GetRootSql() string
	// get username of current session
	GetUserName() string
	GetAccountId() (uint32, error)
	// GetContext get raw context.Context
	GetContext() context.Context
	// GetDatabaseId Get database id
	GetDatabaseId(dbName string, snapshot Snapshot) (uint64, error)

	GetProcess() *process.Process

	GetQueryResultMeta(uuid string) ([]*ColDef, string, error)
	SetBuildingAlterView(yesOrNo bool, dbName, viewName string)
	// is building the alter view or not
	// return: yes or no, dbName, viewName
	GetBuildingAlterView() (bool, string, string)
	GetStatsCache() *StatsCache
	GetSubscriptionMeta(dbName string, snapshot Snapshot) (*SubscriptionMeta, error)
	CheckSubscriptionValid(subName, accName string, pubName string) error
	SetQueryingSubscription(meta *SubscriptionMeta)
	GetQueryingSubscription() *SubscriptionMeta
	IsPublishing(dbName string) (bool, error)
	ResolveSubscriptionTableById(tableId uint64, pubmeta *SubscriptionMeta) (*ObjectRef, *TableDef)

	ResolveSnapshotWithSnapshotName(snapshotName string) (*Snapshot, error)
	CheckTimeStampValid(ts int64) (bool, error)
	//ReplacePlan replaces the plan of the EXECUTE by the plan generated by the PREPARE.
	//return
	//	the plan generated by the PREPARE
	//	the statement generated by the PREPARE
	ReplacePlan(execPlan *plan.Execute) (*plan.Plan, tree.Statement, error)

	GetSnapshot() *Snapshot
	SetSnapshot(snapshot *Snapshot)
	GetViews() []string
	SetViews(views []string)

	GetLowerCaseTableNames() int64
}

type Optimizer interface {
	Optimize(stmt tree.Statement) (*Query, error)
	CurrentContext() CompilerContext
}

type Rule interface {
	Match(*Node) bool                      // rule match?
	Apply(*Node, *Query, *process.Process) // apply the rule
}

// BaseOptimizer is base optimizer, capable of handling only a few simple rules
type BaseOptimizer struct {
	qry   *Query
	rules []Rule
	ctx   CompilerContext
}

type ViewData struct {
	Stmt            string
	DefaultDatabase string
}

type ExecType int

const (
	ExecTypeAP ExecType = iota
	ExecTypeTP
)

type ExecInfo struct {
	Typ        ExecType
	WithGPU    bool
	WithBigMem bool
	CnNumbers  int
}

type QueryBuilder struct {
	qry     *plan.Query
	compCtx CompilerContext

	ctxByNode    []*BindContext
	nameByColRef map[[2]int32]string

	tag2Table map[int32]*TableDef

	nextTag    int32
	nextMsgTag int32

	isPrepareStatement bool
	mysqlCompatible    bool
	haveOnDuplicateKey bool // if it's a plan contain onduplicate key node, we can not use some optmize rule
	isForUpdate        bool // if it's a query plan for update
	isRestore          bool

	deleteNode     map[uint64]int32 //delete node in this query. key is tableId, value is the nodeId of sinkScan node in the delete plan
	skipStats      bool
	optimizerHints *OptimizerHints
}

type OptimizerHints struct {
	pushDownLimitToScan        int
	pushDownTopThroughLeftJoin int
	pushDownSemiAntiJoins      int
	aggPushDown                int
	aggPullUp                  int
	removeEffectLessLeftJoins  int
	removeRedundantJoinCond    int
	optimizeLikeExpr           int
	optimizeDateFormatExpr     int
	determineHashOnPK          int
	sendMessageFromTopToScan   int
	determineShuffle           int
	blockFilter                int
	applyIndices               int
	runtimeFilter              int
	joinOrdering               int
}

type CTERef struct {
	defaultDatabase string
	isRecursive     bool
	ast             *tree.CTE
	maskedCTEs      map[string]bool
	snapshot        *Snapshot
}

type aliasItem struct {
	idx     int32
	astExpr tree.Expr
}

type BindContext struct {
	binder Binder

	cteByName              map[string]*CTERef
	maskedCTEs             map[string]bool
	normalCTE              bool
	initSelect             bool
	recSelect              bool
	finalSelect            bool
	unionSelect            bool
	recRecursiveScanNodeId int32
	isTryBindingCTE        bool

	cteName  string
	headings []string

	groupTag     int32
	aggregateTag int32
	projectTag   int32
	resultTag    int32
	sinkTag      int32
	windowTag    int32
	timeTag      int32
	sampleTag    int32

	groups     []*plan.Expr
	aggregates []*plan.Expr
	projects   []*plan.Expr
	results    []*plan.Expr
	windows    []*plan.Expr
	times      []*plan.Expr

	groupByAst     map[string]int32
	aggregateByAst map[string]int32
	sampleByAst    map[string]int32
	windowByAst    map[string]int32
	projectByExpr  map[string]int32
	timeByAst      map[string]int32

	timeAsts []tree.Expr

	aliasMap map[string]*aliasItem

	bindings       []*Binding
	bindingByTag   map[int32]*Binding //rel_pos
	bindingByTable map[string]*Binding
	bindingByCol   map[string]*Binding

	// for join tables
	bindingTree *BindingTreeNode

	isDistinct   bool
	isCorrelated bool
	hasSingleRow bool

	parent     *BindContext
	leftChild  *BindContext
	rightChild *BindContext

	defaultDatabase string

	forceWindows bool

	// sample function related.
	sampleFunc SampleFuncCtx

	tmpGroups []*plan.Expr

	snapshot *Snapshot
	// all view keys(dbName#viewName)
	views []string

	// lower is sys var lower_case_table_names
	lower int64
}

type NameTuple struct {
	table string
	col   string
}

type BindingTreeNode struct {
	using []NameTuple

	binding *Binding

	left  *BindingTreeNode
	right *BindingTreeNode
}

type Binder interface {
	BindExpr(tree.Expr, int32, bool) (*plan.Expr, error)
	BindColRef(*tree.UnresolvedName, int32, bool) (*plan.Expr, error)
	BindAggFunc(string, *tree.FuncExpr, int32, bool) (*plan.Expr, error)
	BindWinFunc(string, *tree.FuncExpr, int32, bool) (*plan.Expr, error)
	BindSubquery(*tree.Subquery, bool) (*plan.Expr, error)
	BindTimeWindowFunc(string, *tree.FuncExpr, int32, bool) (*plan.Expr, error)
	GetContext() context.Context
}

type baseBinder struct {
	sysCtx    context.Context
	builder   *QueryBuilder
	ctx       *BindContext
	impl      Binder
	boundCols []string
}

type DefaultBinder struct {
	baseBinder
	typ  Type
	cols []string
}

type UpdateBinder struct {
	baseBinder
	cols []*ColDef
}

type TableBinder struct {
	baseBinder
}

type WhereBinder struct {
	baseBinder
}

type GroupBinder struct {
	baseBinder
	selectList tree.SelectExprs
}

type HavingBinder struct {
	baseBinder
	insideAgg bool
}

type ProjectionBinder struct {
	baseBinder
	havingBinder *HavingBinder
}

type OrderBinder struct {
	*ProjectionBinder
	selectList tree.SelectExprs
}

type LimitBinder struct {
	baseBinder
}

type PartitionBinder struct {
	baseBinder
}

// SetBinder for 'set @var = expr'
type SetBinder struct {
	baseBinder
}

var _ Binder = (*TableBinder)(nil)
var _ Binder = (*WhereBinder)(nil)
var _ Binder = (*GroupBinder)(nil)
var _ Binder = (*HavingBinder)(nil)
var _ Binder = (*ProjectionBinder)(nil)
var _ Binder = (*LimitBinder)(nil)
var _ Binder = (*PartitionBinder)(nil)
var _ Binder = (*UpdateBinder)(nil)

var Sequence_cols_name = []string{"last_seq_num", "min_value", "max_value", "start_value", "increment_value", "cycle", "is_called"}

const (
	NotFound      int32 = math.MaxInt32
	AmbiguousName int32 = math.MinInt32
)

type Binding struct {
	tag            int32
	nodeId         int32
	db             string
	table          string
	tableID        uint64
	cols           []string
	colIsHidden    []bool
	types          []*plan.Type
	refCnts        []uint
	colIdByName    map[string]int32
	isClusterTable bool
	defaults       []string
}

const (
	maxLengthOfTableComment  int = 2048
	maxLengthOfColumnComment int = 1024
)

// fuzzy filter need to get partial unique key attrs name and its origin table name
// for Decimal type, we need colDef to get the scale
type OriginTableMessageForFuzzy struct {
	ParentTableName  string
	ParentUniqueCols []*ColDef
}

type MultiTableIndex struct {
	IndexAlgo string
	IndexDefs map[string]*plan.IndexDef
}

type RemapInfo struct {
	step           int32
	node           *plan.Node
	tip            string
	colRefCnt      map[[2]int32]int
	colRefBool     map[[2]int32]bool
	sinkColRef     map[[2]int32]int
	remapping      *ColRefRemapping
	interRemapping *ColRefRemapping
	srcExprIdx     int
}

func (info *RemapInfo) String() string {
	if info == nil {
		return "empty RemapInfo"
	}

	sb := strings.Builder{}
	sb.WriteString("colRefCnt:")
	for k, v := range info.colRefCnt {
		sb.WriteString(fmt.Sprintf("[%v : %v]", k, v))
	}
	sb.WriteString("colRefBool:")
	for k, v := range info.colRefBool {
		sb.WriteString(fmt.Sprintf("[%v : %v]", k, v))
	}
	sb.WriteString("sinkColRef:")
	for k, v := range info.sinkColRef {
		sb.WriteString(fmt.Sprintf("[%v : %v]", k, v))
	}
	sb.WriteString(info.remapping.String())
	sb.WriteString(info.interRemapping.String())

	return fmt.Sprintf(
		"step %d nodeId %d nodeType %s tip %s "+
			"%s "+
			"srcExprIdx %d ",
		info.step,
		info.node.NodeId,
		info.node.NodeType,
		info.tip,
		sb.String(),
		info.srcExprIdx,
	)
}
